package read.niit.service


import org.apache.spark.ml.recommendation.ALSModel
import org.apache.spark.sql.SaveMode
import org.apache.spark.streaming.dstream.DStream
import read.niit.bean.Reader
import read.niit.util.{HBaseUtil, SparkUtil}

class ReaderRecommendService {

  val spark = SparkUtil.takeSpark()
  import spark.implicits._
  import org.apache.spark.sql.functions._

  def dataAnalysis(reader:DStream[Reader]): Unit ={
    reader.foreachRDD(rdd=>{
      //1.获取训练好的模型路径
      //1.2 如果HBase没有存放模型路径使用下面这句话
      val path = "D:\\IDEA_PROJECT\\BD2-Spark\\output\\reader_als_model\\1685439817375"

      //2.加载模型
      val model = ALSModel.load(path)

      //3.由于在ALS推荐算法只能存储纯数字东西（学生ID_32  =32 ）,所以在后面使用模型的时候也需要将读到数据进行截取
      val id2Int = udf( (student_id:String) =>{
        student_id.split("_")(1).toInt
      } )

      //4.由于SparkMlib的模型只能加载SparkSQL 所以需要将Rdd ==> DataFrame
      val readerDF =  rdd.toDF()
      val studentIdDF = readerDF.select(id2Int('student_id) as "student_id")

      //5.使用模型给用户推荐书  10：推荐多少本书籍
      val recommendDF = model.recommendForUserSubset(studentIdDF, 10)
      //false 显示的时候 将省略的信息也显示出来
      recommendDF.printSchema()
      recommendDF.show(false)

      //6.处理推荐结果：取出学生id和书籍id，拼接成字符串：”id1,id2,id3………………“
      val recommendReDF = recommendDF.as[(Int, Array[(Int, Float)])].map(t => {
        val studentId: String = "学生ID_" + t._1
        val bookId = t._2.map("书ID_" + _._1).mkString(",")
        (studentId, bookId)
      }).toDF("student_id", "recommendations")


      //7.将Kafka中的Reader数据和recommendReDF进行合并
      var allInfoDF =  readerDF.join(recommendReDF,"student_id")

      //8.写入数据库
      allInfoDF.write
        .format("jdbc")
        .option("url","jdbc:mysql://node1:3306/BD2?useUnicode=true&characterEncoding=utf8")
        .option("driver","com.mysql.jdbc.Driver")
        .option("user","root")
        .option("password","Niit@123")
        .option("dbtable","reader")//写到User2表里面
        .mode(SaveMode.Append)//追加模式，如果该表不存在就会自动的创建
        .save()

    })
  }

}